/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */


package org.apache.sysds.runtime.matrix.data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.random.Well1024a;
import org.apache.sysds.hops.DataGenOp;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.CounterBasedPRNGenerator;
import org.apache.sysds.runtime.util.IPRNGenerator;
import org.apache.sysds.runtime.util.NormalPRNGenerator;
import org.apache.sysds.runtime.util.PRNGenerator;
import org.apache.sysds.runtime.util.PhiloxNormalCBPRNGenerator;
import org.apache.sysds.runtime.util.PhiloxUniformCBPRNGenerator;
import org.apache.sysds.runtime.util.PoissonPRNGenerator;
import org.apache.sysds.runtime.util.UniformPRNGenerator;
import org.apache.sysds.runtime.util.UtilFunctions;


public class LibMatrixDatagen 
{
	private static final Log LOG = LogFactory.getLog(LibMatrixDatagen.class.getName());
	private static final long PAR_NUMCELL_THRESHOLD = 512*1024; //Min 500k elements
	
	private static IDSequence _seqRandInput = new IDSequence(); 
	
	private LibMatrixDatagen() {
		//prevent instantiation via private constructor
	}

	public static boolean isShortcutRandOperation( double min, double max, double sparsity, RandomMatrixGenerator.PDF pdf )
	{
		return pdf == RandomMatrixGenerator.PDF.UNIFORM
			   && (  ( min == 0.0 && max == 0.0 ) //all zeros
				   ||( sparsity==1.0d && min == max )); //equal values
	}

	public static double updateSeqIncr(double seq_from, double seq_to, double seq_incr) {
		//handle default 1 to -1 for special case of from>to
		return (seq_from>seq_to && seq_incr==1)? -1 : seq_incr;
	}

	public static String generateUniqueSeedPath( String basedir ) {
		return basedir + "tmp" + _seqRandInput.getNextID() + ".randinput";
	}
	
	/**
	 * A matrix of random numbers is generated by using multiple seeds, one for each 
	 * block. Such block-level seeds are produced via Well equidistributed long-period linear 
	 * generator (Well1024a). For a given seed, this function sets up the block-level seeds.
	 * 
	 * This function is invoked from both CP (RandCPInstruction.processInstruction()) 
	 * as well as MR (RandMR.java while setting up the Rand job).
	 * 
	 * @param seed seed for random generator
	 * @return Well1024a pseudo-random number generator
	 */
	public static Well1024a setupSeedsForRand(long seed) 
	{
		long lSeed = (seed == DataGenOp.UNSPECIFIED_SEED ? DataGenOp.generateRandomSeed() : seed);
		LOG.trace("Setting up RandSeeds with initial seed = "+lSeed+".");

		Random random=new Random(lSeed);
		Well1024a bigrand=new Well1024a();
		//random.setSeed(lSeed);
		int[] seeds=new int[32];
		for(int s=0; s<seeds.length; s++)
			seeds[s]=random.nextInt();
		bigrand.setSeed(seeds);
		
		return bigrand;
	}

	@Deprecated
	public static LongStream computeNNZperBlock(long nrow, long ncol, int blen, double sparsity) {
		long lnumBlocks = (long) (Math.ceil((double)nrow/blen) * Math.ceil((double)ncol/blen));
		
		//sanity check max number of blocks (before cast to avoid overflow)
		if ( lnumBlocks > Integer.MAX_VALUE ) {
			throw new DMLRuntimeException("A random matrix of size [" + nrow + "," + ncol + "] can not be created. "
					+ "Number of blocks ("+lnumBlocks+") exceeds the maximum integer size. Try to increase the block size.");
		}

		int numBlocks = (int) lnumBlocks;
		int numColBlocks = (int) Math.ceil((double)ncol/blen);
		long nnz = (long) Math.ceil (nrow * (ncol*sparsity));
		
		if( nnz < numBlocks ) {
			//#1: ultra-sparse random number generation
			//nnz per block: 1 with probability P = nnz/numBlocks, 0 with probability 1-P
			//(note: this is an unbiased generator that, however, will never generate more than 
			//one non-zero per block, but it uses weights to account for different block sizes)
			double P = (double) nnz / numBlocks;
			Random runif = new Random(System.nanoTime());
			return LongStream.range(0, numBlocks).map( i -> {
				double lP = P / (blen*blen) *
					UtilFunctions.computeBlockSize(nrow, 1 + i / numColBlocks, blen) *
					UtilFunctions.computeBlockSize(ncol, 1 + i % numColBlocks, blen);
				return (runif.nextDouble() <= lP) ? 1 : 0;
			});
		}
		else {
			//#2: dense/sparse random number generation
			//nnz per block: lrlen * lclen * sparsity (note: this is a biased generator 
			//that might actually create fewer but never more non zeros than expected)
			return LongStream.range(0, numBlocks).map( i -> (long)(sparsity * 
				UtilFunctions.computeBlockSize(nrow, 1 + i / numColBlocks, blen) *
				UtilFunctions.computeBlockSize(ncol, 1 + i % numColBlocks, blen)));
		}
	}

	public static RandomMatrixGenerator createRandomMatrixGenerator(String pdfStr, int r, int c, int blen, double sp, double min, double max, String distParams) {
		RandomMatrixGenerator.PDF pdf = RandomMatrixGenerator.PDF.valueOf(pdfStr.toUpperCase());
		RandomMatrixGenerator rgen = null;
		switch (pdf) {
			case UNIFORM:
			case CB_UNIFORM:
				rgen = new RandomMatrixGenerator(pdf, r, c, blen, sp, min, max);
				break;
			case NORMAL:
			case CB_NORMAL:
				rgen = new RandomMatrixGenerator(pdf, r, c, blen, sp);
				break;
			case POISSON:
				double mean = Double.NaN;
				try {
					mean = Double.parseDouble(distParams);
				} catch (NumberFormatException e) {
					throw new DMLRuntimeException("Failed to parse Poisson distribution parameter: " + distParams);
				}
				rgen = new RandomMatrixGenerator(pdf, r, c, blen, sp, min, max, mean);
				break;
			default:
				throw new DMLRuntimeException("Unsupported probability distribution \"" + pdf + "\" in rand() -- it must be one of \"uniform\", \"normal\", or \"poisson\"");
		}
		return rgen;
	}
	
	/**
	 * Function to generate a matrix of random numbers. This is invoked both from CP as well as from MR. In case of CP,
	 * it generates an entire matrix block-by-block. A <code>bigrand</code> is passed so that block-level seeds are
	 * generated internally. In case of MR, it generates a single block for given block-level seed <code>bSeed</code>.
	 * 
	 * When pdf="uniform", cell values are drawn from uniform distribution in range <code>[min,max]</code>.
	 * 
	 * When pdf="normal", cell values are drawn from standard normal distribution N(0,1). The range of generated values
	 * will always be (-Inf,+Inf).
	 * 
	 * @param out     output matrix block
	 * @param rgen    random matrix generator
	 * @param bigrand Well1024a pseudo-random number generator
	 * @param bSeed   seed for random generator
	 */
	public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed ) {
		boolean invokedFromCP = (bigrand != null);
		int rows = rgen._rows;
		int cols = rgen._cols;
		int blen = rgen._blocksize;
		double sparsity = rgen._sparsity;
		
		if(out instanceof CompressedMatrixBlock)
			throw new DMLRuntimeException("Invalid to use compressed matrix block as output");

		// sanity check valid dimensions and sparsity
		checkMatrixDimensionsAndSparsity(rows, cols, sparsity);
		
		/*
		 * Setup min and max for distributions other than "uniform". Min and Max
		 * are set up in such a way that the usual logic of
		 * (max-min)*prng.nextDouble() is still valid. This is done primarily to
		 * share the same code across different distributions.
		 */
		double min = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? rgen._min : 0;
		double max = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? rgen._max : 1;
		
		// Special case shortcuts for efficiency
		if ( rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM) {
			if ( min == 0.0 && max == 0.0 || sparsity == 0 ) { //all zeros
				out.reset(rows, cols, true);
				return;
			} 
			else if( sparsity==1.0d && (min == max  //equal values, dense
					|| (Double.isNaN(min) && Double.isNaN(max))) ) { //min == max == NaN
				out.reset(rows, cols, min); 
				return;
			}
		}
		
		// Determine the sparsity of output matrix
		// if invoked from CP: estimated NNZ is for entire matrix (nnz=0, if 0 initialized)
		// if invoked from MR: estimated NNZ is for one block
		final long estnnz = (long) Math.ceil((min==0.0 && max==0.0) ? 0 : sparsity*rows*cols);
		boolean lsparse = MatrixBlock.evalSparseFormatInMemory( rows, cols, estnnz );
		out.reset(rows, cols, lsparse, estnnz);
		
		// allocate memory, incl sparse row allocation if safe
		out.allocateBlock();
		
		//prepare rand internal parameters
		int nrb = (int) Math.ceil((double)rows/blen);
		int ncb = (int) Math.ceil((double)cols/blen);
		long[] seeds = invokedFromCP ? generateSeedsForCP(bigrand, nrb, ncb) : null;
		
		genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, bSeed, seeds);
		
		out.recomputeNonZeros();
	}
	
	/**
	 * Function to generate a matrix of random numbers. This is invoked both
	 * from CP as well as from MR. In case of CP, it generates an entire matrix
	 * block-by-block. A <code>bigrand</code> is passed so that block-level
	 * seeds are generated internally. In case of MR, it generates a single
	 * block for given block-level seed <code>bSeed</code>.
	 * 
	 * When pdf="uniform", cell values are drawn from uniform distribution in
	 * range <code>[min,max]</code>.
	 * 
	 * When pdf="normal", cell values are drawn from standard normal
	 * distribution N(0,1). The range of generated values will always be
	 * (-Inf,+Inf).
	 * 
	 * 
	 * @param out output matrix block
	 * @param rgen random matrix generator
	 * @param bigrand Well1024a pseudo-random number generator
	 * @param bSeed seed for random generator
	 * @param k ?
	 */
	public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed, int k ) {
		int rows = rgen._rows;
		int cols = rgen._cols;
		int blen = rgen._blocksize;
		double sparsity = rgen._sparsity;
		
		if(out instanceof CompressedMatrixBlock)
			throw new DMLRuntimeException("Invalid to use compressed matrix block as output");

		//sanity check valid dimensions and sparsity
		checkMatrixDimensionsAndSparsity(rows, cols, sparsity);
		
		/*
		 * Setup min and max for distributions other than "uniform". Min and Max
		 * are set up in such a way that the usual logic of
		 * (max-min)*prng.nextDouble() is still valid. This is done primarily to
		 * share the same code across different distributions.
		 */
		double min = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? rgen._min : 0;
		double max = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? rgen._max : 1;
		
		//determine the sparsity of output matrix (multi-threaded always invoked from CP):
		//estimated NNZ is for entire matrix (nnz=0, if 0 initialized)
		final long estnnz = ((min==0.0 && max==0.0) ? 0 : (long)(sparsity * rows * cols));
		boolean lsparse = MatrixBlock.evalSparseFormatInMemory( rows, cols, estnnz );
		
		//fallback to sequential if single rowblock or too few cells or if MatrixBlock is not thread safe
		if( k<=1 || (rows <= blen && lsparse) || (long)rows*cols < PAR_NUMCELL_THRESHOLD 
			|| !MatrixBlock.isThreadSafe(lsparse) ) {
			generateRandomMatrix(out, rgen, bigrand, bSeed);
			return;
		}

		//special case shortcuts for efficiency
		if ( rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM) {
			if ( min == 0.0 && max == 0.0 ) { //all zeros
				out.reset(rows, cols, false);
				return;
			} 
			else if( sparsity==1.0d && min == max ) { //equal values
				out.reset(rows, cols, min); 
				return;
			}
		}
		
		// allocate memory, incl sparse row allocation if safe
		out.reset(rows, cols, lsparse, estnnz);
		out.allocateBlock();
		
		int nrb = (int) Math.ceil((double)rows/blen);
		int ncb = (int) Math.ceil((double)cols/blen);
		
		//default: parallelization over row blocks, fallback to parallelization
		//over column blocks if possible and necessary (higher degree of par)
		boolean parcol = (!out.sparse && nrb<k && ncb>nrb);
		int parnb = parcol ? ncb : nrb;
		
		//generate seeds independent of parallelizations
		long[] seeds = generateSeedsForCP(bigrand, nrb, ncb);
		long nnz = 0;
		ExecutorService pool = CommonThreadPool.get(k);
		try {
			ArrayList<RandTask> tasks = new ArrayList<>();
			int blklen = ((int)(Math.ceil((double)parnb/k)));
			for( int i=0; i<k & i*blklen<parnb; i++ ) {
				int rl = parcol ? 0 : i*blklen; 
				int ru = parcol ? nrb : Math.min((i+1)*blklen, parnb);
				int cl = parcol ? i*blklen : 0; 
				int cu = parcol ? Math.min((i+1)*blklen, parnb) : ncb;
				long[] lseeds = sliceSeedsForCP(seeds, rl, ru, cl, cu, nrb, ncb);
				tasks.add(new RandTask(rl, ru, cl, cu, out, rgen, bSeed, lseeds) );
			}

			for(Future<Long> rc : pool.invokeAll(tasks)) 
				nnz += rc.get();
		} 
		catch (Exception e) {
			throw new DMLRuntimeException(e);
		}
		finally{
			pool.shutdown();
		}
		
		out.setNonZeros(nnz);
	}
	
	/**
	 * Method to generate a sequence according to the given parameters. The
	 * generated sequence is always in dense format.
	 * 
	 * Both end points specified <code>from</code> and <code>to</code> must be
	 * included in the generated sequence i.e., [from,to] both inclusive. Note
	 * that, <code>to</code> is included only if (to-from) is perfectly
	 * divisible by <code>incr</code>.
	 * 
	 * For example, seq(0,1,0.5) generates (0.0 0.5 1.0) 
	 *      whereas seq(0,1,0.6) generates (0.0 0.6) but not (0.0 0.6 1.0)
	 * 
	 * @param out output matrix block
	 * @param from lower end point
	 * @param to upper end point
	 * @param incr increment value
	 */
	public static void generateSequence(MatrixBlock out, double from, double to, double incr) {
		//check valid increment value
		if( (from > to && incr > 0) || incr == 0 )
			throw new DMLRuntimeException("Wrong sequence increment: from="+from+", to="+to+ ", incr="+incr);
		
		//prepare output matrix
		int rows = (int) UtilFunctions.getSeqLength(from, to, incr);
		int cols = 1; // sequence vector always dense
		out.reset(rows, cols, false);
		out.allocateDenseBlock();
	
		//compute sequence data
		double[] c = out.getDenseBlockValues();
		double cur = from;
		for(int i=0; i < rows; i++) {
			c[i] = cur;
			cur += incr;
		}
		
		if((incr > 0 && from > 0) || (incr < 0 && from < 0))
			out.nonZeros = rows;
		else if(from == 0 && incr != 0)
			out.nonZeros = rows - 1;
		else
			out.recomputeNonZeros();
	}

	/**
	 * Generates a sample of size <code>size</code> from a range of values [1,range]. <code>replace</code> defines if
	 * sampling is done with or without replacement.
	 * 
	 * @param out     output matrix block
	 * @param range   range upper bound
	 * @param size    sample size
	 * @param replace if true, sample with replacement
	 * @param seed    seed for random generator
	 */
	public static void generateSample(MatrixBlock out, long range, int size, boolean replace, long seed) {
		//set meta data and allocate dense block
		out.reset(size, 1, false);
		double[] a = out.allocateBlock().getDenseBlockValues();
		seed = (seed == -1 ? System.nanoTime() : seed);
		
		if ( !replace ) 
		{
			// reservoir sampling
			for(int i=0; i < size; i++) 
				a[i] = i + 1;
			
			Random rand = new Random(seed);
			for(int i=size+1; i <= range; i++) {
				if(rand.nextInt(i) < size)
					a[rand.nextInt(size)] = i;
			}
			
			// randomize the sample (Algorithm P from Knuth's ACP)
			// needed especially when the difference between range and size is small)
			for( int i = 0; i < size-1; i++ ) {
				//generate index in i <= j < size
				int j = rand.nextInt(size - i) + i;
				//swap i^th and j^th entry
				double tmp = a[i];
				a[i] = a[j];
				a[j] = tmp;
			}
		}
		else {
			Random r = new Random(seed);
			for(int i=0; i < size; i++) 
				a[i] = 1+nextLong(r, range);
		}
		
		out.setNonZeros(size);
		out.examSparsity();
	}

	private static long[] generateSeedsForCP(Well1024a bigrand, int nrb, int ncb)
	{
		int numBlocks = nrb * ncb;
		long[] seeds = new long[numBlocks];
		for( int l = 0; l < numBlocks; l++ )
			seeds[l] = bigrand.nextLong();
		
		return seeds;
	}

	private static long[] sliceSeedsForCP(long[] seeds, int rl, int ru, int cl, int cu, int nrb, int ncb)
	{
		int numBlocks = (ru-rl) * (cu-cl);
		long[] lseeds = new long[numBlocks];
		for( int i = rl; i < ru; i++ )
			System.arraycopy(seeds, i*ncb+cl, lseeds, (i-rl)*(cu-cl), cu-cl);
		
		return lseeds;
	}

	private static void genRandomNumbers(boolean invokedFromCP, int rl, int ru, int cl, int cu, MatrixBlock out, RandomMatrixGenerator rgen, long bSeed, long[] seeds) {
		int rows = rgen._rows;
		int cols = rgen._cols;
		int blen = rgen._blocksize;
		double sparsity = rgen._sparsity;
		double min = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? rgen._min : 0;
		double max = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? rgen._max : 1;
		double range = max - min;
		int clen = out.clen;
		int estnnzRow = (int)(sparsity * cols);
		
		int nrb = (int) Math.ceil((double)rows/blen);
		int ncb = (int) Math.ceil((double)cols/blen);
		int counter = 0;

		long[] ctr = {0, 0, 0, 0}; // Counter based RNG counter

		// Setup Pseudo Random Number Generator for cell values based on 'pdf'.
		IPRNGenerator valuePRNG = rgen._valuePRNG;
		if (valuePRNG == null) {
			switch (rgen._pdf) {
				case UNIFORM: valuePRNG = new UniformPRNGenerator(); break;
				case NORMAL:  valuePRNG = new NormalPRNGenerator(); break;
				case POISSON: valuePRNG = new PoissonPRNGenerator(); break;
				case CB_UNIFORM: valuePRNG = new PhiloxUniformCBPRNGenerator(); break;
				case CB_NORMAL: valuePRNG = new PhiloxNormalCBPRNGenerator(); break;
				default:
					throw new DMLRuntimeException("Unsupported distribution function for Rand: " + rgen._pdf);
			}
		}
		
		//preallocate prng for non-zero entries
		UniformPRNGenerator nnzPRNG = new UniformPRNGenerator(0);
		
		//preallocate sparse rows if safe
		if( out.sparse && estnnzRow > 0 && cl==0 && cu==ncb )
			for( int i=rl*blen; i<Math.min(ru*blen, rows); i++ )
				out.sparseBlock.allocate(i, estnnzRow);
		
		// loop through row-block indices
		for(int rbi = rl; rbi < ru; rbi++) {
			int blockrows = (rbi == nrb-1 ? (rows-rbi*blen) : blen);
			int rowoffset = rbi*blen;

			// loop through column-block indices
			for(int cbj = cl; cbj < cu; cbj++) {
				int blockcols = (cbj == ncb-1 ? (cols-cbj*blen) : blen);
				int coloffset = cbj*blen;
				
				// select the appropriate block-level seed and init PRNG
				long seed = !invokedFromCP ?  bSeed : seeds[counter++];
				valuePRNG.setSeed(seed);
				
				// Initialize the PRNGenerator for determining cells that contain a non-zero value
				// Note that, "pdf" parameter applies only to cell values and the individual cells 
				// are always selected uniformly at random.
				// Also note that we cannot use the same seed here, because for ultra-sparse generation
				// the number of calls to the valuePRNG and nnzPRNG are the same, thus creating correlated
				// outcomes (bias toward the end of the value range)

				if (valuePRNG instanceof CounterBasedPRNGenerator) {
					nnzPRNG.setSeed((long)(((CounterBasedPRNGenerator) valuePRNG).getDoubles(ctr, 1)[0]*Long.MAX_VALUE ));
				} else {
					nnzPRNG.setSeed((long)(((PRNGenerator) valuePRNG).nextDouble()*Long.MAX_VALUE));
				}

				boolean localSparse = sparsity < 1 && MatrixBlock.evalSparseFormatInMemory(
					blockrows, blockcols, (long)(sparsity*blockrows*blockcols));
				if ( localSparse) {
					SparseBlock c = out.sparseBlock;
					if(c == null){
						out.allocateSparseRowsBlock();
						out.sparse = true; //otherwise ignored
						c = out.sparseBlock;
					}
					if (valuePRNG instanceof PRNGenerator) {
						genSparse(c, clen, blockrows, blockcols, rowoffset, coloffset,
								sparsity, estnnzRow, min, range, (PRNGenerator) valuePRNG, nnzPRNG);
					}
				}
				else {
					if (sparsity == 1.0) {
						genFullyDense(out.getDenseBlock(), blockrows, blockcols,
							rowoffset, coloffset, min, range, valuePRNG, ctr);
					}
					else {
						if (valuePRNG instanceof PRNGenerator) {
							genDense(out, clen, blockrows, blockcols, rowoffset, coloffset,
									sparsity, estnnzRow, min, range, (PRNGenerator) valuePRNG, nnzPRNG);
						}

					}
				} // sparse or dense 
			} // cbj
		} // rbi
	}
	
	private static void genSparse(SparseBlock c, int clen, int blockrows, int blockcols, int rowoffset, int coloffset,
		double sparsity, int estnnzRow, double min, double range, PRNGenerator valuePRNG, UniformPRNGenerator nnzPRNG)
	{
		// Prob [k-1 zeros before a nonzero] = Prob [k-1 < log(uniform)/log(1-p) < k] = p*(1-p)^(k-1), where p=sparsity
		double log1mp = Math.log(1-sparsity);
		long idx = 0;  // takes values in range [1, blen*blen] (both ends including)
		long blocksize = blockrows*blockcols;
		while(idx < blocksize) {
			//compute skip to next index
			idx = idx + (long) Math.ceil(Math.log(nnzPRNG.nextDouble())/log1mp);
			//check blocksize and save int casts
			if ( idx > blocksize)
				break;
			// translate idx into (r,c) within the block
			int rix = (int)(idx-1)/blockcols;
			int cix = (int)(idx-1)%blockcols;
			double val = min + (range * valuePRNG.nextDouble());
			c.allocate(rowoffset+rix, estnnzRow, clen);
			c.append(rowoffset+rix, coloffset+cix, val);
		}
	}
	
	private static void genDense(MatrixBlock out, int clen, int blockrows, int blockcols, int rowoffset, int coloffset,
		double sparsity, int estnnzRow, double min, double range, PRNGenerator valuePRNG, UniformPRNGenerator nnzPRNG)
	{
		if (out.sparse ) {
			/* This case evaluated only when this function is invoked from CP. 
			 * In this case:
			 *     sparse=true -> entire matrix is in sparse format and hence denseBlock=null
			 *     localSparse=false -> local block is dense, and hence on MR side a denseBlock will be allocated
			 * i.e., we need to generate data in a dense-style but set values in sparseRows
			 * 
			 */
			// In this case, entire matrix is in sparse format but the current block is dense
			SparseBlock c = out.sparseBlock;
			for(int ii=0; ii < blockrows; ii++) {
				for(int jj=0; jj < blockcols; jj++) {
					if(nnzPRNG.nextDouble() <= sparsity) {
						double val = min + (range * valuePRNG.nextDouble());
						c.allocate(ii+rowoffset, estnnzRow, clen);
						c.append(ii+rowoffset, jj+coloffset, val);
					}
				}
			}
		}
		else {
			DenseBlock c = out.getDenseBlock();
			for(int ii = 0; ii < blockrows; ii++) {
				double[] cvals = c.values(rowoffset+ii);
				int cix = c.pos(rowoffset+ii, coloffset);
				for(int jj = 0; jj < blockcols; jj++)
					if(nnzPRNG.nextDouble() <= sparsity)
						cvals[cix+jj] =  min + (range * valuePRNG.nextDouble());
			}
		}
	}
	
	private static void genFullyDense(DenseBlock c, int blockrows, int blockcols, int rowoffset, int coloffset, 
		double min, double range, IPRNGenerator valuePRNG, long[] ctr)
	{
		Iterator<Double> rngStream;
		if (valuePRNG instanceof PRNGenerator) {
			rngStream = Stream.generate(() -> min + (range * ((PRNGenerator) valuePRNG).nextDouble())).iterator();
		} else if (valuePRNG instanceof CounterBasedPRNGenerator) {
			rngStream = Arrays.stream(((CounterBasedPRNGenerator)valuePRNG).getDoubles(ctr, blockrows * blockcols)).map(i -> min + (range * i)).iterator();
		} else {
			throw new DMLRuntimeException("Unsupported PRNGenerator for fully dense generation");
		}
		for (int i = rowoffset; i < rowoffset + blockrows; i++) {
			double[] cvals = c.values(i);
			int cix = c.pos(i, coloffset);
			for (int j = 0; j < blockcols; j++) {
				cvals[cix + j] = rngStream.next();
			}
		}
	}

	private static void checkMatrixDimensionsAndSparsity(int rows, int cols, double sp) {
		if( rows < 0 || cols < 0 || sp < 0 || sp > 1)
			throw new DMLRuntimeException("Invalid matrix characteristics: "+rows+"x"+cols+", "+sp);
	}
	
	// modified version of java.util.nextInt
	private static long nextLong(Random r, long n) {
		if (n <= 0)
			throw new IllegalArgumentException("n must be positive");
		long bits, val;
		do {
			bits = (r.nextLong() << 1) >>> 1;
			val = bits % n;
		} while (bits - val + (n-1) < 0L);
		return val;
	}

	private static class RandTask implements Callable<Long> 
	{
		private int _rl = -1;
		private int _ru = -1;
		private int _cl = -1;
		private int _cu = -1;
		private MatrixBlock _out = null;
		private RandomMatrixGenerator _rgen = new RandomMatrixGenerator();
		private long _bSeed = 0;
		private long[] _seeds = null;
		
		public RandTask(int rl, int ru, int cl, int cu, MatrixBlock out, RandomMatrixGenerator rgen, long bSeed, long[] seeds) {
			_rl = rl;
			_ru = ru;
			_cl = cl;
			_cu = cu;
			_out = out;
			_rgen.init(rgen._pdf, rgen._rows, rgen._cols, rgen._blocksize, rgen._sparsity, rgen._min, rgen._max, rgen._mean);
			_bSeed = bSeed;
			_seeds = seeds;
		}

		@Override
		public Long call() {
			//execute rand operations (with block indexes)
			genRandomNumbers(true, _rl, _ru, _cl, _cu, _out, _rgen, _bSeed, _seeds);
			
			//thread-local maintenance of non-zero values
			int blen =_rgen._blocksize;
			int rl = _rl*blen;
			int ru = Math.min(_ru*blen,_rgen._rows);
			int cl = _cl*blen;
			int cu = Math.min(_cu*blen, _rgen._cols);
			return _rgen.isFullyDense() ? (long)(ru-rl) * (cu-cl) :
				_out.recomputeNonZeros(rl, ru-1, cl, cu-1);
		}
	}
}
